from __future__ import annotations

import asyncio
import inspect
import time
import uuid
from collections import defaultdict
from collections.abc import Awaitable, Iterable
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, ClassVar

from fastapi import Request
from fastapi.responses import Response
from fastapi.templating import Jinja2Templates
from typing_extensions import Self

from . import background_tasks, binding, core, helpers, json, storage
from .awaitable_response import AwaitableResponse
from .dependencies import generate_resources
from .element import Element
from .favicon import get_favicon_url
from .javascript_request import JavaScriptRequest
from .logging import log
from .observables import ObservableDict
from .outbox import Outbox
from .sub_pages_router import SubPagesRouter
from .translations import translations
from .version import __version__

if TYPE_CHECKING:
    from .page import page

templates = Jinja2Templates(Path(__file__).parent / 'templates')

HTML_ESCAPE_TABLE = str.maketrans({
    '&': '&amp;',
    '<': '&lt;',
    '>': '&gt;',
    '`': '&#96;',
    '$': '&#36;',
})


class ClientConnectionTimeout(TimeoutError):
    def __init__(self, client: Client) -> None:
        super().__init__(f'ClientConnectionTimeout: {client.id}')
        self.client = client


class Client:
    page_routes: ClassVar[dict[Callable[..., Any], str]] = {}
    '''Maps page builders to their routes.'''

    instances: ClassVar[dict[str, Client]] = {}
    '''Maps client IDs to clients.'''

    shared_head_html = ''
    '''HTML to be inserted in the <head> of every page template.'''

    shared_body_html = ''
    '''HTML to be inserted in the <body> of every page template.'''

    def __init__(self, page: page, *, request: Request | None = None) -> None:
        self._request = request
        self.id = str(uuid.uuid4())
        self.created = time.time()
        self.instances[self.id] = self

        self.elements: dict[int, Element] = {}
        self.next_element_id: int = 0
        self._waiting_for_connection = asyncio.Event()
        self._waiting_for_disconnect = asyncio.Event()
        self._connected = asyncio.Event()
        self._deleted_event = asyncio.Event()
        self.environ: dict[str, Any] | None = None
        self.on_air = False
        self._num_connections: defaultdict[str, int] = defaultdict(int)
        self._delete_tasks: dict[str, asyncio.Task] = {}
        self._deleted = False
        self._socket_to_document_id: dict[str, str] = {}
        self.tab_id: str | None = None

        self.page = page
        self.outbox = Outbox(self)

        with Element('q-layout', _client=self).props('view="hhh lpr fff"').classes('nicegui-layout') as self.layout:
            with Element('q-page-container') as self.page_container:
                with Element('q-page'):
                    self.content = Element('div').classes('nicegui-content')

        self.title: str | None = None

        self._head_html = ''
        self._body_html = ''

        self.storage = ObservableDict()

        self.connect_handlers: list[Callable[..., Any] | Awaitable] = []
        self.disconnect_handlers: list[Callable[..., Any] | Awaitable] = []
        self.delete_handlers: list[Callable[..., Any] | Awaitable] = []

        self._temporary_socket_id: str | None = None

        with self:
            self.sub_pages_router = SubPagesRouter(request)

    @property
    def request(self) -> Request:
        """The request object for the client."""
        if self._request is None:
            raise RuntimeError('Request is not set')
        return self._request

    @property
    def ip(self) -> str:
        """The IP address of the client.

        *Updated in version 2.0.0: The IP address is available even before the client connects.*
        *Updated in version 3.0.0: The IP address is always defined (never ``None``).*
        """
        return self.request.client.host if self.request.client is not None else ''

    @property
    def has_socket_connection(self) -> bool:
        """Whether the client is connected."""
        return self.tab_id is not None

    @property
    def head_html(self) -> str:
        """The HTML code to be inserted in the <head> of the page template."""
        return self.shared_head_html + self._head_html

    @property
    def body_html(self) -> str:
        """The HTML code to be inserted in the <body> of the page template."""
        return self.shared_body_html + self._body_html

    def __enter__(self) -> Self:
        self.content.__enter__()
        return self

    def __exit__(self, *_) -> None:
        self.content.__exit__()

    def build_response(self, request: Request, status_code: int = 200) -> Response:
        """Build a FastAPI response for the client."""
        self.outbox.updates.clear()
        prefix = request.headers.get('X-Forwarded-Prefix', '') + request.scope.get('root_path', '')
        elements = json.dumps({
            id: element._to_dict() for id, element in self.elements.items()  # pylint: disable=protected-access
        })
        socket_io_js_query_params = {
            **core.app.config.socket_io_js_query_params,
            'client_id': self.id,
            'next_message_id': self.outbox.next_message_id,
        }
        vue_html, vue_styles, vue_scripts, imports, js_imports, js_imports_urls = \
            generate_resources(prefix, self.elements.values())
        return templates.TemplateResponse(
            request=request,
            name='index.html',
            context={
                'request': request,
                'version': __version__,
                'elements': elements.translate(HTML_ESCAPE_TABLE),
                'head_html': self.head_html,
                'body_html': '<style>' + '\n'.join(vue_styles) + '</style>\n' + self.body_html + '\n' + '\n'.join(vue_html),
                'vue_scripts': '\n'.join(vue_scripts),
                'imports': json.dumps(imports),
                'js_imports': '\n'.join(js_imports),
                'js_imports_urls': js_imports_urls,
                'vue_config': json.dumps(core.app.config.quasar_config),
                'vue_config_script': core.app.config.vue_config_script,
                'title': self.resolve_title(),
                'viewport': self.page.resolve_viewport(),
                'favicon_url': get_favicon_url(self.page, prefix),
                'dark': str(self.page.resolve_dark()),
                'language': self.page.resolve_language(),
                'translations': translations.get(self.page.resolve_language(), translations['en-US']),
                'prefix': prefix,
                'tailwind': core.app.config.tailwind,
                'prod_js': core.app.config.prod_js,
                'socket_io_js_query_params': socket_io_js_query_params,
                'socket_io_js_extra_headers': core.app.config.socket_io_js_extra_headers,
                'socket_io_js_transports': core.app.config.socket_io_js_transports,
            },
            status_code=status_code,
            headers={'Cache-Control': 'no-store', 'X-NiceGUI-Content': 'page'},
        )

    def resolve_title(self) -> str:
        """Return the title of the page."""
        return self.page.resolve_title() if self.title is None else self.title

    async def connected(self, timeout: float | None = None) -> None:
        """Block execution until the client is connected.

        :param timeout: timeout in seconds (default: ``None``)
        """
        if self.has_socket_connection:
            return
        self._waiting_for_connection.set()
        self._connected.clear()
        purpose = (self.request.headers.get('Sec-Purpose') or self.request.headers.get('Purpose') or '').lower()
        is_prefetch = 'prefetch' in purpose and 'prerender' not in purpose
        try:
            await asyncio.wait_for(self._connected.wait(), timeout=None if is_prefetch else timeout)
        except asyncio.TimeoutError as e:
            raise ClientConnectionTimeout(self) from e

    async def disconnected(self) -> None:
        """Block execution until the client disconnects."""
        if not self.has_socket_connection:
            await self.connected()
        if self.id in self.instances:
            self._waiting_for_disconnect.set()
            self._deleted_event.clear()
            await self._deleted_event.wait()

    def run_javascript(self, code: str, *, timeout: float = 1.0) -> AwaitableResponse:
        """Execute JavaScript on the client.

        If the function is awaited, the result of the JavaScript code is returned.
        Otherwise, the JavaScript code is executed without waiting for a response.

        Obviously the JavaScript code is only executed after the client is connected.
        Internally, ``await client.connected()`` is called before the JavaScript code is executed (*since version 3.0.0*).
        This might delay the execution of the JavaScript code and is not covered by the ``timeout`` parameter.

        :param code: JavaScript code to run
        :param timeout: timeout in seconds (default: 1.0)

        :return: AwaitableResponse that can be awaited to get the result of the JavaScript code
        """
        request_id = str(uuid.uuid4())
        target_id = self._temporary_socket_id or self.id

        def send_and_forget():
            self.outbox.enqueue_message('run_javascript', {'code': code}, target_id)

        async def send_and_wait():
            self.outbox.enqueue_message('run_javascript', {'code': code, 'request_id': request_id}, target_id)
            await self.connected()
            return await JavaScriptRequest(request_id, timeout=timeout)

        return AwaitableResponse(send_and_forget, send_and_wait)

    def open(self, target: Callable[..., Any] | str, new_tab: bool = False) -> None:
        """Open a new page in the client."""
        path = target if isinstance(target, str) else self.page_routes[target]
        self.outbox.enqueue_message('open', {'path': path, 'new_tab': new_tab}, self.id)

    def download(self, src: str | bytes, filename: str | None = None, media_type: str = '') -> None:
        """Download a file from a given URL or raw bytes."""
        self.outbox.enqueue_message('download', {'src': src, 'filename': filename, 'media_type': media_type}, self.id)

    def on_connect(self, handler: Callable[..., Any] | Awaitable) -> None:
        """Add a callback to be invoked when the client connects.

        The callback has an optional parameter of `nicegui.Client`.
        """
        self.connect_handlers.append(handler)

    def on_disconnect(self, handler: Callable[..., Any] | Awaitable) -> None:
        """Add a callback to be invoked when the client disconnects.

        The callback has an optional parameter of `nicegui.Client`.

        *Updated in version 3.0.0: The handler is also called when a client reconnects.*
        """
        self.disconnect_handlers.append(handler)

    def on_delete(self, handler: Callable[..., Any] | Awaitable) -> None:
        """Add a callback to be invoked when the client is deleted.

        The callback has an optional parameter of `nicegui.Client`.

        *Added in version 3.0.0*
        """
        self.delete_handlers.append(handler)

    def handle_handshake(self, socket_id: str, document_id: str, next_message_id: int | None) -> None:
        """Cancel pending disconnect task and invoke connect handlers."""
        self._waiting_for_connection.clear()
        self._connected.set()
        self._socket_to_document_id[socket_id] = document_id
        self._cancel_delete_task(document_id)
        self._num_connections[document_id] += 1
        if next_message_id is not None:
            self.outbox.try_rewind(next_message_id)
        storage.request_contextvar.set(self.request)
        for t in self.connect_handlers:
            self.safe_invoke(t)
        for t in core.app._connect_handlers:  # pylint: disable=protected-access
            self.safe_invoke(t)

    def handle_disconnect(self, socket_id: str) -> None:
        """Wait for the browser to reconnect; invoke deletion handlers if it doesn't."""
        if socket_id not in self._socket_to_document_id:
            return
        document_id = self._socket_to_document_id.pop(socket_id)
        self._cancel_delete_task(document_id)
        self._num_connections[document_id] -= 1
        self.tab_id = None

        for t in self.disconnect_handlers:
            self.safe_invoke(t)
        for t in core.app._disconnect_handlers:  # pylint: disable=protected-access
            self.safe_invoke(t)

        async def delete_content() -> None:
            await asyncio.sleep(self.page.resolve_reconnect_timeout())
            if self._num_connections[document_id] == 0:
                self._num_connections.pop(document_id)
                self._delete_tasks.pop(document_id)
                self.delete()
        self._delete_tasks[document_id] = \
            background_tasks.create(delete_content(), name=f'delete content {document_id}')

    def _cancel_delete_task(self, document_id: str) -> None:
        if document_id in self._delete_tasks:
            self._delete_tasks.pop(document_id).cancel()

    def handle_event(self, msg: dict) -> None:
        """Forward an event to the corresponding element."""
        with self:
            sender = self.elements.get(msg['id'])
            if sender is not None and not sender.is_ignoring_events:
                msg['args'] = [None if arg is None else json.loads(arg) for arg in msg.get('args', [])]
                if len(msg['args']) == 1:
                    msg['args'] = msg['args'][0]
                sender._handle_event(msg)  # pylint: disable=protected-access

    def handle_javascript_response(self, msg: dict) -> None:
        """Store the result of a JavaScript command."""
        JavaScriptRequest.resolve(msg['request_id'], msg.get('result'))

    def safe_invoke(self, func: Callable[..., Any] | Awaitable) -> None:
        """Invoke the potentially async function in the client context and catch any exceptions."""
        func_name = func.__name__ if hasattr(func, '__name__') else str(func)
        try:
            if isinstance(func, Awaitable):
                async def func_with_client():
                    with self:
                        await func
                background_tasks.create(func_with_client(), name=f'func with client {self.id} {func_name}')
            else:
                with self:
                    result = func(self) if len(inspect.signature(func).parameters) == 1 else func()
                if helpers.is_coroutine_function(func) and not isinstance(result, asyncio.Task):
                    async def result_with_client():
                        with self:
                            await result
                    background_tasks.create(result_with_client(), name=f'result with client {self.id} {func_name}')
        except Exception as e:
            core.app.handle_exception(e)

    def remove_elements(self, elements: Iterable[Element]) -> None:
        """Remove the given elements from the client."""
        element_list = list(elements)  # NOTE: we need to iterate over the elements multiple times
        binding.remove(element_list)
        for element in element_list:
            element._handle_delete()  # pylint: disable=protected-access
            element._deleted = True  # pylint: disable=protected-access
            self.outbox.enqueue_delete(element)
            self.elements.pop(element.id, None)

    def remove_all_elements(self) -> None:
        """Remove all elements from the client."""
        self.remove_elements(self.elements.values())

    def delete(self) -> None:
        """Delete a client and all its elements.

        If the global clients dictionary does not contain the client, its elements are still removed and a KeyError is raised.
        Normally this should never happen, but has been observed (see #1826).
        """
        for t in self.delete_handlers:
            self.safe_invoke(t)
        for t in core.app._delete_handlers:  # pylint: disable=protected-access
            self.safe_invoke(t)
        self._waiting_for_disconnect.clear()
        self._deleted_event.set()
        self.remove_all_elements()
        self.outbox.stop()
        del Client.instances[self.id]
        self._deleted = True

    def check_existence(self) -> None:
        """Check if the client still exists and print a warning if it doesn't."""
        if self._deleted:
            helpers.warn_once('Client has been deleted but is still being used. '
                              'This is most likely a bug in your application code. '
                              'See https://github.com/zauberzeug/nicegui/issues/3028 for more information.',
                              stack_info=True)

    @classmethod
    def prune_instances(cls, *, client_age_threshold: float = 60.0) -> None:
        """Prune stale clients."""
        try:
            stale_clients = [
                client
                for client in cls.instances.values()
                if (
                    not client.has_socket_connection and
                    not client._delete_tasks and  # pylint: disable=protected-access
                    client.created <= time.time() - client_age_threshold
                )
            ]
            for client in stale_clients:
                log.debug(f'Pruning stale client {client.id}')
                client.delete()

        except Exception:
            log.exception('Error while pruning clients')
